/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.iotdb.db.utils.datastructure;

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.MathUtils;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE;
import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;

public abstract class TVList implements WALEntryValue {

    protected static final int SMALL_ARRAY_LENGTH = 32;
    protected static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent";
    protected static final long targetChunkSize =
            IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
    // list of timestamp array, add 1 when expanded -> data point timestamp array
    // index relation: arrayIndex -> elementIndex
    protected List<long[]> timestamps;
    protected int rowCount;

    protected boolean sorted = true;
    protected long maxTime;
    // record reference count of this tv list
    // currently this reference will only be increase because we can't know when to decrease it
    protected AtomicInteger referenceCount;
    private long version;

    public TVList() {
        timestamps = new ArrayList<>();
        rowCount = 0;
        maxTime = Long.MIN_VALUE;
        referenceCount = new AtomicInteger();
    }

    public static TVList newList(TSDataType dataType) {
        switch (dataType) {
            case TEXT:
                return BinaryTVList.newList();
            case FLOAT:
                return FloatTVList.newList();
            case INT32:
                return IntTVList.newList();
            case INT64:
                return LongTVList.newList();
            case DOUBLE:
                return DoubleTVList.newList();
            case BOOLEAN:
                return BooleanTVList.newList();
            default:
                break;
        }
        return null;
    }

    public static long tvListArrayMemCost(TSDataType type) {
        long size = 0;
        // time array mem size
        size += (long) PrimitiveArrayManager.ARRAY_SIZE * 8L;
        // value array mem size
        size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize();
        // two array headers mem size
        size += NUM_BYTES_ARRAY_HEADER * 2L;
        // Object references size in ArrayList
        size += NUM_BYTES_OBJECT_REF * 2L;
        return size;
    }

    public boolean isSorted() {
        return sorted;
    }

    public abstract void sort();

    public void increaseReferenceCount() {
        referenceCount.incrementAndGet();
    }

    public int getReferenceCount() {
        return referenceCount.get();
    }

    public int rowCount() {
        return rowCount;
    }

    public long getTime(int index) {
        if (index >= rowCount) {
            throw new ArrayIndexOutOfBoundsException(index);
        }
        int arrayIndex = index / ARRAY_SIZE;
        int elementIndex = index % ARRAY_SIZE;
        return timestamps.get(arrayIndex)[elementIndex];
    }

    public void putLong(long time, long value) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public void putInt(long time, int value) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public void putFloat(long time, float value) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public void putDouble(long time, double value) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public void putBinary(long time, Binary value) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public boolean reachMaxChunkSizeThreshold() {
        return false;
    }

    public void putBoolean(long time, boolean value) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public void putAlignedValue(long time, Object[] value, int[] columnIndexArray) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public void putLongs(long[] time, long[] value, BitMap bitMap, int start, int end) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public void putInts(long[] time, int[] value, BitMap bitMap, int start, int end) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public void putFloats(long[] time, float[] value, BitMap bitMap, int start, int end) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public void putDoubles(long[] time, double[] value, BitMap bitMap, int start, int end) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public void putBinaries(long[] time, Binary[] value, BitMap bitMap, int start, int end) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public void putBooleans(long[] time, boolean[] value, BitMap bitMap, int start, int end) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public void putAlignedValues(
            long[] time, Object[] value, BitMap[] bitMaps, int[] columnIndexArray, int start, int end) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public long getLong(int index) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public int getInt(int index) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public float getFloat(int index) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public double getDouble(int index) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public Binary getBinary(int index) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public boolean getBoolean(int index) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public Object getAlignedValue(int index) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public TVList getTvListByColumnIndex(
            List<Integer> columnIndexList, List<TSDataType> dataTypeList) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public int getValueIndex(int index) {
        throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
    }

    public long getMaxTime() {
        return maxTime;
    }

    public long getVersion() {
        return version;
    }

    protected abstract void set(int src, int dest);

    protected abstract void expandValues();

    @Override
    public abstract TVList clone();

    public TVList clone(long version) {
        this.version = version;
        return clone();
    }

    protected abstract void releaseLastValueArray();

    protected void releaseLastTimeArray() {
        PrimitiveArrayManager.release(timestamps.remove(timestamps.size() - 1));
    }

    public int delete(long lowerBound, long upperBound) {
        int newSize = 0;
        maxTime = Long.MIN_VALUE;
        for (int i = 0; i < rowCount; i++) {
            long time = getTime(i);
            if (time < lowerBound || time > upperBound) {
                set(i, newSize++);
                maxTime = Math.max(time, maxTime);
            }
        }
        int deletedNumber = rowCount - newSize;
        rowCount = newSize;
        // release primitive arrays that are empty
        int newArrayNum = newSize / ARRAY_SIZE;
        if (newSize % ARRAY_SIZE != 0) {
            newArrayNum++;
        }
        int oldArrayNum = timestamps.size();
        for (int releaseIdx = newArrayNum; releaseIdx < oldArrayNum; releaseIdx++) {
            releaseLastTimeArray();
            releaseLastValueArray();
        }
        return deletedNumber;
    }

    protected void cloneAs(TVList cloneList) {
        for (long[] timestampArray : timestamps) {
            cloneList.timestamps.add(cloneTime(timestampArray));
        }
        cloneList.rowCount = rowCount;
        cloneList.sorted = sorted;
        cloneList.maxTime = maxTime;
    }

    public void clear() {
        rowCount = 0;
        sorted = true;
        maxTime = Long.MIN_VALUE;
        clearTime();
        clearValue();
    }

    protected void clearTime() {
        if (timestamps != null) {
            for (long[] dataArray : timestamps) {
                PrimitiveArrayManager.release(dataArray);
            }
            timestamps.clear();
        }
    }

    abstract void clearValue();

    protected void checkExpansion() {
        if ((rowCount % ARRAY_SIZE) == 0) {
            expandValues();
            timestamps.add((long[]) getPrimitiveArraysByType(TSDataType.INT64));
        }
    }

    protected Object getPrimitiveArraysByType(TSDataType dataType) {
        return PrimitiveArrayManager.allocate(dataType);
    }

    protected long[] cloneTime(long[] array) {
        long[] cloneArray = new long[array.length];
        System.arraycopy(array, 0, cloneArray, 0, array.length);
        return cloneArray;
    }

    void updateMaxTimeAndSorted(long[] time, int start, int end) {
        int length = time.length;
        long inPutMinTime = Long.MAX_VALUE;
        boolean inputSorted = true;
        for (int i = start; i < end; i++) {
            inPutMinTime = Math.min(inPutMinTime, time[i]);
            maxTime = Math.max(maxTime, time[i]);
            if (inputSorted && i < length - 1 && time[i] > time[i + 1]) {
                inputSorted = false;
            }
        }
        sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1));
    }

    /**
     * for log
     */
    public abstract TimeValuePair getTimeValuePair(int index);

    protected abstract TimeValuePair getTimeValuePair(
            int index, long time, Integer floatPrecision, TSEncoding encoding);

    @TestOnly
    public TsBlock buildTsBlock() {
        return buildTsBlock(0, TSEncoding.PLAIN, null);
    }

    public TsBlock buildTsBlock(
            int floatPrecision, TSEncoding encoding, List<TimeRange> deletionList) {
        TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(this.getDataType()));
        writeValidValuesIntoTsBlock(builder, floatPrecision, encoding, deletionList);
        return builder.build();
    }

    protected abstract void writeValidValuesIntoTsBlock(
            TsBlockBuilder builder,
            int floatPrecision,
            TSEncoding encoding,
            List<TimeRange> deletionList);

    protected boolean isPointDeleted(
            long timestamp, List<TimeRange> deletionList, Integer deleteCursor) {
        while (deletionList != null && deleteCursor < deletionList.size()) {
            if (deletionList.get(deleteCursor).contains(timestamp)) {
                return true;
            } else if (deletionList.get(deleteCursor).getMax() < timestamp) {
                deleteCursor++;
            } else {
                return false;
            }
        }
        return false;
    }

    protected float roundValueWithGivenPrecision(
            float value, int floatPrecision, TSEncoding encoding) {
        if (!Float.isNaN(value) && (encoding == TSEncoding.RLE || encoding == TSEncoding.TS_2DIFF)) {
            return MathUtils.roundWithGivenPrecision(value, floatPrecision);
        }
        return value;
    }

    protected double roundValueWithGivenPrecision(
            double value, int floatPrecision, TSEncoding encoding) {
        if (!Double.isNaN(value) && (encoding == TSEncoding.RLE || encoding == TSEncoding.TS_2DIFF)) {
            return MathUtils.roundWithGivenPrecision(value, floatPrecision);
        }
        return value;
    }

    public abstract TSDataType getDataType();

    public static TVList deserialize(DataInputStream stream) throws IOException {
        TSDataType dataType = ReadWriteIOUtils.readDataType(stream);
        switch (dataType) {
            case TEXT:
                return BinaryTVList.deserialize(stream);
            case FLOAT:
                return FloatTVList.deserialize(stream);
            case INT32:
                return IntTVList.deserialize(stream);
            case INT64:
                return LongTVList.deserialize(stream);
            case DOUBLE:
                return DoubleTVList.deserialize(stream);
            case BOOLEAN:
                return BooleanTVList.deserialize(stream);
            case VECTOR:
                return AlignedTVList.deserialize(stream);
            default:
                break;
        }
        return null;
    }
}
